Skip to content

Add Pipeline CRD for Redpanda Connect pipeline management#1337

Open
david-yu wants to merge 65 commits into
mainfrom
feat/connect-crd
Open

Add Pipeline CRD for Redpanda Connect pipeline management#1337
david-yu wants to merge 65 commits into
mainfrom
feat/connect-crd

Conversation

@david-yu
Copy link
Copy Markdown
Contributor

@david-yu david-yu commented Mar 23, 2026

Summary

Adds a Pipeline CRD (cluster.redpanda.com/v1alpha2) that manages Redpanda Connect pipelines as first-class Kubernetes resources. The spec is statically typed against the same Kubernetes-native primitives the rest of the v1alpha2 CRDs already use:

  • cluster (*ClusterSource) — same primitive Topic/User use. Either point at a Redpanda CR (clusterRef) or supply brokers/TLS/SASL inline (staticConfiguration). When set, the operator inline-merges seed_brokers, tls, and sasl into any input.redpanda / output.redpanda blocks the user wrote in configYaml. The non-deprecated redpanda plugin family is the merge target; the legacy redpanda_common is not auto-configured.

  • userRef — optional alongside cluster.clusterRef. Binds the pipeline to a User CR; the operator reads the User's Secret-backed password + SCRAM mechanism and uses User.metadata.name as the SASL username. The User CR stays user-managed so ACL scoping is auditable (operator does not auto-create or modify it). Omit for unauthenticated clusters.

    Why userRef is flat while cluster.clusterRef is wrapped. cluster is a ClusterSource — a discriminated union of two sources: point at a Redpanda CR (clusterRef: { name }) OR supply brokers/TLS/SASL inline (staticConfiguration: {...}). The wrapping exists to express that union, and it matches the existing Topic/User CRDs (spec.cluster: *ClusterSource). userRef has only one source today — point at a User CR by name — so it stays flat: userRef: { name }. The inline-SASL counterpart already lives at cluster.staticConfiguration.kafka.sasl.{mechanism,username,password} (also flat — also a single source), and CEL forbids combining it with userRef. If a future use case needs a second user-identity source (e.g., inline SASL without a User CR backing it), we can promote userRef into a user: UserSource wrapper at that point.

  • serviceAccountName — the ServiceAccount bound to the pipeline pod. When unset, the namespace's default SA is used. Set this to scope cloud-IAM trust (IRSA on EKS, Workload Identity on GKE, Pod Identity on AKS) per-pipeline rather than sharing the namespace's default SA across every pipeline. The operator does not create the SA — provision it (with the cloud-IAM annotations) out-of-band.

  • valueSources — typed list of named env-var projections (one named pull per entry) backed by inline / configMapKeyRef / secretKeyRef / externalSecretRef. Replaces the earlier secretRef[] env-splat and env[] raw corev1.EnvVar approaches.

  • image — per-pipeline Connect runtime override. Three-tier precedence: Pipeline.spec.image wins, then the chart-level connectController.image.{repository,tag} default (plumbed in via the operator's --connect-default-image flag), then the binary-baked PipelineDefaultImage constant.

  • configYaml — the user's Connect pipeline YAML. Stays the inline catch-all for anything the typed fields don't cover.

CEL on PipelineSpec enforces the contract: userRef is forbidden alongside cluster.staticConfiguration (the static path carries its own inline SASL), and userRef is forbidden without cluster.clusterRef (no cluster context to authenticate against otherwise). userRef is otherwise an opt-in for SASL-enabled clusters.

Worked examples

A. Cluster-bound — Pipeline points at a Redpanda CR on the same Kubernetes cluster

The user provisions a SCRAM User CR with ACLs scoped to what the pipeline reads/writes; the Pipeline then references both the cluster and the user.

User CR (separate manifest, owns the SCRAM identity + ACLs):

apiVersion: cluster.redpanda.com/v1alpha2
kind: User
metadata:
  name: orders-to-warehouse
  namespace: redpanda
spec:
  cluster:
    clusterRef:
      name: redpanda
  authentication:
    type: scram-sha-512
    password:
      valueFrom:
        secretKeyRef:
          name: orders-to-warehouse-password
          key: password
  authorization:
    acls:
      - type: allow
        resource: { type: topic, name: orders, patternType: literal }
        operations: [Read, Describe]
      - type: allow
        resource: { type: group, name: orders-to-warehouse-ingest, patternType: literal }
        operations: [Read, Describe]

Pipeline CR:

apiVersion: cluster.redpanda.com/v1alpha2
kind: Pipeline
metadata:
  name: orders-to-warehouse
  namespace: redpanda
spec:
  cluster:
    clusterRef:
      name: redpanda             # operator resolves brokers + TLS
  userRef:
    name: orders-to-warehouse    # operator reads password Secret + mechanism

  valueSources:
    - name: S3_SECRET_KEY
      source:
        secretKeyRef: { name: s3-creds, key: secret_access_key }

  configYaml: |
    input:
      redpanda:
        # Only the per-plugin fields — seed_brokers, tls, sasl are
        # filled in by the operator from clusterRef + userRef.
        topics: [orders]
        consumer_group: orders-to-warehouse-ingest
    pipeline:
      processors:
        - mapping: |
            root = this
            root.ingested_at = now()
    output:
      aws_s3:
        bucket: warehouse-orders
        region: us-east-2
        credentials:
          secret: ${S3_SECRET_KEY}

What the operator renders into the pod's /config/connect.yaml:

input:
  redpanda:
    seed_brokers:                                      # injected
      - redpanda-0.redpanda.redpanda.svc.cluster.local.:9093
      - redpanda-1.redpanda.redpanda.svc.cluster.local.:9093
      - redpanda-2.redpanda.redpanda.svc.cluster.local.:9093
    tls:                                               # injected
      enabled: true
      root_cas_file: /etc/tls/certs/ca/ca.crt
    sasl:                                              # injected
      - mechanism: SCRAM-SHA-512
        username: ${REDPANDA_SASL_USERNAME}
        password: ${REDPANDA_SASL_PASSWORD}
    topics: [orders]                                   # user-supplied
    consumer_group: orders-to-warehouse-ingest         # user-supplied
pipeline:
  processors:
    - mapping: |
        root = this
        root.ingested_at = now()
output:
  aws_s3:
    bucket: warehouse-orders
    region: us-east-2
    credentials:
      secret: ${S3_SECRET_KEY}

Pod env (auto-derived):

  • REDPANDA_SASL_USERNAME = "orders-to-warehouse" (literal, from User.metadata.name)
  • REDPANDA_SASL_MECHANISM = "SCRAM-SHA-512" (literal, from User.spec.authentication.type)
  • REDPANDA_SASL_PASSWORDsecretKeyRef: { orders-to-warehouse-password, password } (from User.spec.authentication.password.valueFrom.secretKeyRef)
  • S3_SECRET_KEYsecretKeyRef: { s3-creds, secret_access_key } (from valueSources)

User keys win on conflict. If the user had written seed_brokers: [external.example.com:9093] inside input.redpanda, the operator would have left that value untouched and only injected the missing tls and sasl keys. That's the escape hatch for cluster-bound pipelines that need to point a specific input/output at a different cluster.

B. External Kafka / BYOC — static configuration

For pipelines reaching an external Redpanda or Kafka the operator doesn't run (Redpanda Cloud BYOC, cross-region tap, Confluent Cloud, MSK, etc.). No userRef; SASL credentials live inline on the static config and the password is itself a ValueSource.

Pipeline CR:

apiVersion: cluster.redpanda.com/v1alpha2
kind: Pipeline
metadata:
  name: cross-region-mirror
  namespace: redpanda
spec:
  cluster:
    staticConfiguration:
      kafka:
        brokers:
          - kafka.us-east.example.com:9094
        tls:
          enabled: true
          caCertSecretRef:
            name: external-kafka-ca
            key: ca.crt
        sasl:
          mechanism: SCRAM-SHA-512
          username: pipeline-mirror-svc
          password:
            secretKeyRef:
              name: external-kafka-creds
              key: password
  # No userRef — staticConfiguration carries its own SASL identity.
  configYaml: |
    input:
      redpanda:
        topics: [orders]
    output:
      redpanda:
        topic: orders.mirrored

What the operator renders:

input:
  redpanda:
    seed_brokers: [kafka.us-east.example.com:9094]     # injected
    tls:                                               # injected
      enabled: true
      root_cas_file: /etc/tls/certs/ca/ca.crt
    sasl:                                              # injected
      - mechanism: SCRAM-SHA-512
        username: ${REDPANDA_SASL_USERNAME}
        password: ${REDPANDA_SASL_PASSWORD}
    topics: [orders]                                   # user-supplied
output:
  redpanda:
    seed_brokers: [kafka.us-east.example.com:9094]     # injected
    tls: { enabled: true, root_cas_file: /etc/tls/certs/ca/ca.crt }
    sasl:                                              # injected
      - mechanism: SCRAM-SHA-512
        username: ${REDPANDA_SASL_USERNAME}
        password: ${REDPANDA_SASL_PASSWORD}
    topic: orders.mirrored                             # user-supplied

staticConfiguration and clusterRef produce the same inline-merge contract — only the source-of-truth for the connection fields differs. No User CR involved.

C. Per-pipeline IRSA — native RDS IAM database authentication via serviceAccountName

Pipeline binds to a Redpanda cluster for its output and to RDS for its CDC input. The pipeline pod itself calls AWS APIs (rds:GenerateDBAuthToken) using an IAM role assumed via IRSA. serviceAccountName scopes that trust to this one Pipeline — no other workload in the redpanda namespace can assume the role.

Out-of-band: ServiceAccount with the IRSA annotation (terraform / pulumi / a separate manifest — the operator does not create it):

apiVersion: v1
kind: ServiceAccount
metadata:
  name: mysql-cdc-orders-rds
  namespace: redpanda
  annotations:
    # The role's trust policy is scoped to
    # system:serviceaccount:redpanda:mysql-cdc-orders-rds — i.e. this
    # exact SA. Its inline policy grants rds-db:connect on
    # arn:aws:rds-db:us-east-2:<acct>:dbuser:<DbiResourceId>/cdc.
    eks.amazonaws.com/role-arn: arn:aws:iam::605419575229:role/mysql-cdc-orders-rds

Pipeline CR:

apiVersion: cluster.redpanda.com/v1alpha2
kind: Pipeline
metadata:
  name: mysql-cdc-orders
  namespace: redpanda
spec:
  cluster:
    clusterRef: { name: redpanda }
  userRef:
    name: mysql-cdc-orders-svc       # SASL identity for output
  serviceAccountName: mysql-cdc-orders-rds   # IRSA boundary for AWS

  valueSources:
    # No MYSQL_PASSWORD — IAM auth supplies it on the fly.
    - name: MYSQL_HOST
      source:
        secretKeyRef: { name: mysql-cdc-creds, key: host }
    - name: MYSQL_USER
      source:
        secretKeyRef: { name: mysql-cdc-creds, key: username }

  configYaml: |
    input:
      mysql_cdc:
        # IAM auth: Connect calls rds:GenerateDBAuthToken using the
        # pod's IRSA-assumed role and uses the 15-min token as the
        # MySQL password. allowCleartextPasswords=1 is required so the
        # Go MySQL driver sends the token (which it sees as a plaintext
        # password) over the TLS-protected connection.
        dsn: "${MYSQL_USER}@tcp(${MYSQL_HOST}:3306)/shop?tls=skip-verify&allowCleartextPasswords=1"
        aws:
          enabled: true
          region: us-east-2
          endpoint: "${MYSQL_HOST}:3306"
        stream_snapshot: true
        tables: [orders]
        flavor: mysql
    pipeline:
      processors:
        - mapping: |
            root = this
            root.cdc_received_at = now()
    output:
      redpanda:                       # only the per-plugin fields
        topic: mysql.shop.orders
        key: '${! @table }'

The pipeline pod assumes mysql-cdc-orders-rds (and only that role) via the projected service-account token at /var/run/secrets/eks.amazonaws.com/serviceaccount/token. The MySQL connection uses an IAM-generated token; no MySQL password lives anywhere in the pod, the Pipeline CR, or a Secret.

This is the production K8s-RDS pattern: IRSA gates AWS API access (so the pipeline can mint MySQL tokens); Pipeline userRef gates Redpanda access (so the pipeline can write its output topic). The two trust boundaries are orthogonal.

D. Inline — pipeline references multiple external sources via valueSources

For pipelines whose primary connection isn't Redpanda at all, or that fan out to multiple non-Kafka backends. valueSources is destination-agnostic: each entry is a typed pull from inline / Secret / ConfigMap / ExternalSecret and projects to an env var the YAML references via ${NAME}. Connect plugins read those env vars however they expose credentials.

apiVersion: cluster.redpanda.com/v1alpha2
kind: Pipeline
metadata:
  name: orders-fanout
  namespace: redpanda
spec:
  cluster:
    clusterRef: { name: redpanda }
  userRef:
    name: orders-fanout-svc

  valueSources:
    # MongoDB Atlas — full connection URI from a Secret managed by the
    # team that provisions the cluster.
    - name: MONGO_URI
      source:
        secretKeyRef:
          name: mongo-orders-atlas
          key: connection_uri          # mongodb+srv://user:pass@cluster.../...

    # Snowflake key-pair auth — account/user inline; private key from a
    # Secret; passphrase via external-secrets.io (1Password / Vault / etc.).
    - name: SNOWFLAKE_ACCOUNT
      source: { inline: "ab12345.us-east-1" }
    - name: SNOWFLAKE_USER
      source: { inline: "PIPELINE_SVC" }
    - name: SNOWFLAKE_PRIVATE_KEY
      source:
        secretKeyRef:
          name: snowflake-pipeline-svc
          key: rsa_key.p8
    - name: SNOWFLAKE_PRIVATE_KEY_PASSPHRASE
      source:
        externalSecretRef:
          name: snowflake-pipeline-svc-passphrase

    # MySQL — DSN composed from a Secret-backed password and a ConfigMap-
    # provided host so app teams can rotate the read-replica endpoint
    # without touching Pipeline CRs.
    - name: MYSQL_USER
      source: { inline: "warehouse_writer" }
    - name: MYSQL_PASSWORD
      source:
        secretKeyRef:
          name: mysql-warehouse-creds
          key: password
    - name: MYSQL_HOST
      source:
        configMapKeyRef:
          name: warehouse-env
          key: mysql_replica_host

  configYaml: |
    input:
      redpanda:
        # Only the per-plugin fields — seed_brokers / tls / sasl
        # injected by the operator from clusterRef + userRef.
        topics: [orders]
        consumer_group: orders-fanout

    output:
      broker:
        outputs:
          - mongodb:
              url: ${MONGO_URI}
              database: orders
              collection: ingested
              operation: insert-one

          - snowflake_put:
              account: ${SNOWFLAKE_ACCOUNT}
              user: ${SNOWFLAKE_USER}
              private_key: ${SNOWFLAKE_PRIVATE_KEY}
              private_key_pass: ${SNOWFLAKE_PRIVATE_KEY_PASSPHRASE}
              database: WAREHOUSE
              schema: PUBLIC
              stage: "@PIPELINE_STAGE"

          - sql_insert:
              driver: mysql
              dsn: "${MYSQL_USER}:${MYSQL_PASSWORD}@tcp(${MYSQL_HOST}:3306)/warehouse?parseTime=true"
              table: orders
              columns: [order_id, ingested_at, payload]
              args_mapping: |
                root = [ this.id, this.ingested_at, this.format_json() ]

Properties this design intentionally preserves:

  • Plugin-agnostic: the inline-merge only touches input.redpanda and output.redpanda blocks. New Connect plugins ship in future Connect releases without any operator change; non-redpanda blocks (mongodb, snowflake_put, sql_insert, aws_s3, mysql_cdc, etc.) pass through untouched.
  • Mixed sources per pipeline: inline / secretKeyRef / configMapKeyRef / externalSecretRef can mix freely across entries in the same valueSources list.
  • Per-key, not per-Secret: unlike the earlier secretRef[] env-splat, every value is a named pull. Unused keys in a Secret don't leak into the pod env, the env name and the Secret's key can differ, and multiple pipelines can pull non-overlapping keys from the same Secret.
  • Orthogonal trust boundaries: cluster + userRef gates Redpanda access; serviceAccountName gates cloud-IAM access. Either can be set without the other.

Status conditions

Condition True False (examples)
ClusterRef cluster.clusterRef resolved → broker list + TLS material loaded ClusterRefInvalid (cluster not found / not Ready)
UserRef userRef.name exists, has password.valueFrom.secretKeyRef set, mechanism resolved UserInvalid (User CR not found, missing Secret-backed password)
ConfigValid redpanda-connect lint passes ConfigInvalid
Ready all of the above + Deployment Ready otherwise false

Tests

  • TestRender_InlineMergesRedpandaPlugins — six subtests covering the cluster-binding render path: merges into output.redpanda, merges into input.redpanda, user-supplied keys win on conflict, no *.redpanda block in user config → no injection, output.redpanda_common is intentionally not auto-configured (regression guard against re-emitting a top-level redpanda: block), fully-inline pipeline (no cluster binding) passes through unchanged.
  • TestRender_Deployment_ServiceAccountName — propagation of spec.serviceAccountName to Deployment.Spec.Template.Spec.ServiceAccountName; empty when unset.
  • TestRender_Deployment_ImagePrecedence — three subtests, one per image precedence tier (per-pipeline spec.image > chart-level connectController.image > binary constant).
  • TestRender_Deployment_ValueSources — asserts EnvFrom is empty on both the lint init and the connect container, and that each ValueSource entry projects as exactly one typed EnvVar (inline → Value; secretKeyRef/configMapKeyRefValueFrom.{Secret,ConfigMap}KeyRef).
  • TestReconcile_InvalidClusterRefCleansUpManagedResourcescluster.clusterRef resolution failure short-circuits before user resolution; status surfaces ClusterRefInvalid and managed resources are torn down.
  • Existing TestRender_* cases cover Replicas, Paused, Zones, Resources, Annotations, Topology, Budget, ConfigFiles, MonitoringPodMonitor.
  • task lint (helm lint + golangci-lint + actionlint) clean.
  • task generate clean (no further diff).

End-to-end validation

The branch was exercised against real AWS infrastructure (EKS 1.34 + RDS MySQL 8.0 with iam_database_authentication_enabled=true) using the Example C scenario (per-pipeline IRSA + native RDS IAM auth + Pipeline CR writing to Redpanda). MySQL snapshot rows reached the mysql.shop.orders topic with no MySQL password anywhere in the Pipeline CR, the pod env, or any Secret. See the e2e comment thread for the full run + reproduction steps.

@david-yu david-yu changed the title Add Connect CRD for Redpanda Connect pipeline management Add Pipeline CRD for Redpanda Connect pipeline management Mar 24, 2026
@github-actions
Copy link
Copy Markdown

This PR is stale because it has been open 5 days with no activity. Remove stale label or comment or this will be closed in 5 days.

@github-actions github-actions Bot added the stale label Mar 30, 2026
@david-yu david-yu removed the stale label Mar 30, 2026
@david-yu david-yu marked this pull request as ready for review March 30, 2026 22:51
Copy link
Copy Markdown
Contributor

@andrewstucki andrewstucki left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is just a movement of connect pipeline reconcilers over from another repo, but would definitely want to change a chunk of the design around how this reconciliation works to be more inline with the patterns that this repo has before merging anything like this. Could we just add this in as part of a roadmap rather than trying to generate it? It shouldn't take more than a day or two to implement properly once we actually pull it in. But as is, there are a number of issues I see immediately with this PR that need changing:

  1. we try to use SSA semantics whenever possible, so the CreateOrPatch and Update calls are out-of-place.
  2. not a huge fan of swallowing the status Update errors on the reconcile calls, and it appears inconsistent -- some times it looks like we're returning the update error, sometimes swallowing it
  3. we generally try and externalize our sub-resource definitions to some sort of "render" package to avoid having to inline everything
  4. this should likely use the kube.Ctl synchronization primitives
  5. I'm assuming we'd probably want to run some of the secret stuff through cloud-secret materialization?
  6. would we want any of the configuration around Redpanda sources to somehow be pluggable with our clusterRef-style specification?
  7. this appears to not have created the RBAC policies in the proper place as it needs to be copies over to the helm chart itself
  8. the tests should actually test the reconciler, here they just do license validation
  9. I'd prefer to use some sort of enum/typed status information for the pipeline conditions, because what they are/do are basically undocumented right now
  10. at least one rendering test in the helm chart should test the enabling flag
  11. the CRD itself also needs to be added to the CRD installation process subcommand in order for this to ever work.
  12. for a new CRD type we should have at least one acceptance test that excercises the feature.

@david-yu
Copy link
Copy Markdown
Contributor Author

david-yu commented Mar 31, 2026

Moving back to draft mode. Thanks for taking a look.

@david-yu david-yu marked this pull request as draft March 31, 2026 03:37
@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 6, 2026

This PR is stale because it has been open 5 days with no activity. Remove stale label or comment or this will be closed in 5 days.

@github-actions github-actions Bot added the stale label Apr 6, 2026
@david-yu david-yu removed the stale label Apr 6, 2026
david-yu and others added 14 commits April 8, 2026 11:56
Introduces the Connect custom resource (shortName: rpcn) for managing
Redpanda Connect pipelines via the Redpanda Operator. Each Connect CR
declaratively specifies a pipeline configuration in YAML, and the
controller reconciles the desired state by managing a Deployment and
ConfigMap.

Enterprise license gating: the controller validates a Redpanda enterprise
license (v1 format from common-go/license) on every reconciliation. The
license must include the CONNECT product and be unexpired. The license is
read from a Kubernetes Secret referenced by spec.licenseSecretRef.

Key components:
- CRD types: Connect, ConnectSpec, ConnectStatus in v1alpha2
- Controller: creates/patches ConfigMap + Deployment, updates status
- RBAC: ClusterRole permissions for connects, deployments, configmaps, secrets
- CRD manifest: cluster.redpanda.com_connects.yaml
- Gated behind --enable-connect flag (default: false)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Update generated files to match what CI's controller-gen v0.20.1
and code generators produce:

- Move Connect deepcopy functions to correct alphabetical position
  (after Configurator, before ConnectorMonitoring)
- Regenerate CRD YAML with full OpenAPI schema from controller-gen
- Update crd-docs.adoc with Connect type documentation
- Add Connect deprecation test case
- Update RBAC role.yaml to match controller-gen output
- Add missing common-go/license go.sum entries in acceptance/ and gen/
- Fix whitespace in run.go

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Fix TestCRDS by adding connects.cluster.redpanda.com to the expected
CRD list and adding a Connect() helper function.

Add Cloud-compatible fields to ConnectSpec for smooth migration to
Redpanda Cloud managed Connect:
- displayName: human-readable pipeline name
- description: pipeline description
- tags: key-value pairs for filtering/organization
- configFiles: additional config files mounted at /config

The controller now includes configFiles entries in the ConfigMap
alongside connect.yaml, with a guard against key collision.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add displayName, description, tags, and configFiles documentation
to the ConnectSpec section of the generated CRD docs.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add scheduling fields to ConnectSpec for spreading pipeline pods
across availability zones:

- zones: list of AZs to constrain and spread pods across. When set,
  the controller auto-generates a node affinity (restrict to listed
  zones) and a topology spread constraint (even distribution with
  maxSkew=1, ScheduleAnyway) using topology.kubernetes.io/zone.
- tolerations: standard k8s tolerations for tainted nodes
- nodeSelector: label-based node selection
- topologySpreadConstraints: explicit spread constraints that
  override the auto-generated zone constraint when provided

Example usage:
  spec:
    zones: ["us-east-1a", "us-east-1b", "us-east-1c"]
    replicas: 3

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Update connects CRD YAML with full TopologySpreadConstraint schema
instead of x-kubernetes-preserve-unknown-fields, expand toleration
descriptions, fix field ordering (nodeSelector before paused), and
update crd-docs.adoc descriptions to match Go struct comments.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The Connect controller is now enabled by default (--enable-connect=true).
Users can disable it via the operator helm chart value:

  helm install redpanda-operator ... --set connectController.enabled=false

Individual Connect pipeline CRs still require an enterprise license
with the CONNECT product — enabling the controller alone does not
grant enterprise functionality.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Update README, template, schema, partial types, and golden files
to include the new connectController chart value.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Make spec.licenseSecretRef optional on Connect CRs. When not set, the
controller falls back to the operator-level enterprise license configured
via enterprise.licenseSecretRef in the operator Helm chart values.

This avoids requiring users to specify the license on every Connect
pipeline CR. The operator-level license is passed via --license-file-path
and mounted from the chart's enterprise.licenseSecretRef secret.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Remove spec.licenseSecretRef from Connect CRD entirely. License is
  now only configured at the operator level via enterprise.licenseSecretRef
  in the operator Helm chart values.
- Set connectController.enabled to false by default (opt-in).
- Simplify controller license validation to only read from the
  operator-level license file path.
- Add unit tests for license validation covering: no license configured,
  invalid file, expired license, open source license, V0 enterprise
  license with all products, V1 enterprise with/without CONNECT product,
  V1 trial license, and V1 expired enterprise license.
- Fix values.schema.json alphabetical ordering (connectController before crds).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
david-yu and others added 2 commits April 14, 2026 00:06
Match the v2 Redpanda CRD convention (Budget.MaxUnavailable *int)
rather than the v1 IntOrString pattern. Removes MinAvailable and
percentage support for a simpler API surface.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown

This PR is stale because it has been open 5 days with no activity. Remove stale label or comment or this will be closed in 5 days.

@github-actions github-actions Bot added the stale label Apr 23, 2026
@david-yu david-yu removed the stale label Apr 23, 2026
@github-actions
Copy link
Copy Markdown

This PR is stale because it has been open 5 days with no activity. Remove stale label or comment or this will be closed in 5 days.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 5, 2026

This PR is stale because it has been open 5 days with no activity. Remove stale label or comment or this will be closed in 5 days.

@github-actions github-actions Bot added the stale label May 5, 2026
@david-yu david-yu removed the stale label May 5, 2026
david-yu and others added 2 commits May 8, 2026 01:21
Resolve conflicts in three generated/test files; all conflicts were
mechanical (both branches added new entries to the same files):

- operator/chart/values_partial.gen.go: keep PartialConnectMonitoringConfig
  alongside the new PartialMulticlusterService struct from main.
- operator/chart/testdata/template-cases.txtar: keep the connect-controller
  test cases alongside the new multicluster service test cases.
- operator/chart/testdata/template-cases.golden.txtar: regenerated via
  `go test ./operator/chart/... -run TestTemplate -update-golden` so the
  chart version labels reflect main's bump to v26.2.1-beta.1.

Also correct CLAUDE.md: chart template tests use `-update-golden`, not
`-update`. The TxTar golden helper checks `goldenfile.Update()` which is
gated only by the `-update-golden` flag, so `-update` silently no-ops on
these tests.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The Pipeline controller already validated that the operator-level enterprise
license includes the Connect product, but the rendered Connect pod ran with
the open-source default license. Enterprise inputs like mysql_cdc hit Connect's
own runtime license gate ("this feature requires a valid Redpanda Enterprise
Edition license that includes the Connect product") and crashed, forcing users
to manually wire the license into a `spec.secretRef`.

The renderer now mirrors the operator's license bytes into a Pipeline-owned
Secret (`<pipeline>-license`) and injects `REDPANDA_LICENSE` into the connect
and lint containers via a SecretKeyRef. The Secret is owned by the Pipeline CR
so it GCs cleanly on delete, and lives in the Pipeline's own namespace so no
cross-namespace secret references are needed. RBAC for `secrets` is widened
from `get;list;watch` to include `create;update;patch;delete`.

Also fix the PodMonitor CRD-presence check: `errors.Is(err,
&meta.NoKindMatchError{})` was comparing pointer addresses (NoKindMatchError
has no `Is` method), so the intended fast-path never ran. Replaced with
`meta.IsNoMatchError(err)`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@david-yu
Copy link
Copy Markdown
Contributor Author

david-yu commented May 8, 2026

End-to-end test: mysql_cdc enterprise connector

Validated the Pipeline CRD against MySQL with binlog CDC streaming into Redpanda. Used the merged-with-main branch tip (340a0eb7) which includes the auto-license-propagation fix.

Stack

  • Cluster: k3d v5.8.3 with rancher/k3s:v1.32.13-k3s1
  • Operator: built from this branch as localhost/redpanda-operator:dev, --enable-connect=true, enterprise.licenseSecretRef pointing at a CONNECT-enabled enterprise license
  • Redpanda: v26.1.1 cluster (single broker, unauthenticated)
  • MySQL: 8.0 with --log-bin --binlog-format=ROW --binlog-row-image=FULL --gtid-mode=ON --enforce-gtid-consistency=ON
  • Pipeline CR: mysql_cdc input → redpanda output (using clusterRef)

Result: ✅ Pass

  • Pipeline phase reached Running (Ready=True, ConfigValid=True, ClusterRefResolved=True)
  • All 5 snapshot rows landed on topic mysql.shop.orders (offsets 0–4)
  • 2 rows inserted live after pipeline start propagated through binlog and arrived at offsets 5–6
  • Auto-rendered mysql-cdc-orders-license Secret was owned by the Pipeline CR (cleaned up on kubectl delete pipeline)
  • REDPANDA_LICENSE env var sourced via secretKeyRef to the auto-rendered Secret — no manual license wiring needed
{
  "topic": "mysql.shop.orders",
  "key": "orders",
  "value": "{\"cdc_received_at\":\"...\",\"created_at\":\"...\",\"customer\":\"alice\",\"id\":1,\"product\":\"wrench\",\"qty\":3}",
  "offset": 0
}

Reproduction steps

1. Prerequisites

  • nix develop shell (provides k3d, kubectl, helm)
  • Docker / Rancher Desktop running
  • A Redpanda enterprise license file that includes the CONNECT product (free trials don't qualify — both the operator's gate and the Connect runtime's gate require it)

2. Build the operator image

BUILD_GOOS=linux BUILD_GOARCH=arm64 task build:operator-image \
  PLATFORMS=linux/arm64 \
  CLI_ARGS=--load

Produces localhost/redpanda-operator:dev.

3. Spin up a k3d cluster + cert-manager + operator

k3d cluster create pipeline-e2e \
  --image rancher/k3s:v1.32.13-k3s1 --no-lb \
  --k3s-arg '--disable=traefik@server:0' \
  --k3s-arg '--disable=metrics-server@server:0'

k3d image import localhost/redpanda-operator:dev --cluster pipeline-e2e

helm repo add jetstack https://charts.jetstack.io --force-update
helm install cert-manager jetstack/cert-manager \
  -n cert-manager --create-namespace \
  --version v1.17.2 --set crds.enabled=true --wait

kubectl create namespace redpanda
kubectl create namespace mysql

kubectl -n redpanda create secret generic redpanda-license \
  --from-file=license=/path/to/redpanda.license

helm install redpanda-operator ./operator/chart \
  -n redpanda \
  --set image.repository=localhost/redpanda-operator \
  --set image.tag=dev \
  --set image.pullPolicy=IfNotPresent \
  --set rbac.createAdditionalControllerCRs=false \
  --set connectController.enabled=true \
  --set enterprise.licenseSecretRef.name=redpanda-license \
  --set enterprise.licenseSecretRef.key=license \
  --set crds.enabled=true \
  --set crds.experimental=true \
  --wait

4. Deploy a Redpanda cluster

# redpanda.yaml
apiVersion: cluster.redpanda.com/v1alpha2
kind: Redpanda
metadata:
  name: redpanda
  namespace: redpanda
spec:
  chartRef: {}
  clusterSpec:
    statefulset:
      replicas: 1
      sideCars:
        controllers:
          enabled: true
    resources:
      cpu: { cores: 1 }
      memory: { container: { max: 2Gi } }
    config:
      cluster:
        auto_create_topics_enabled: true
    console: { enabled: false }
    tls: { enabled: false }
    listeners:
      kafka:          { tls: { enabled: false } }
      admin:          { tls: { enabled: false } }
      schemaRegistry: { tls: { enabled: false } }
      http:           { tls: { enabled: false } }
kubectl apply -f redpanda.yaml

5. Deploy MySQL with binlog enabled and seed data

# mysql.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: mysql-init
  namespace: mysql
data:
  init.sql: |
    CREATE DATABASE IF NOT EXISTS shop;
    USE shop;
    CREATE TABLE IF NOT EXISTS orders (
      id INT AUTO_INCREMENT PRIMARY KEY,
      customer VARCHAR(64) NOT NULL,
      product  VARCHAR(64) NOT NULL,
      qty      INT          NOT NULL,
      created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    );
    INSERT INTO orders (customer, product, qty) VALUES
      ('alice','wrench',3),('bob','hammer',1),('carol','sprocket',7),
      ('dave','wrench',2),('eve','cog',5);
    CREATE USER IF NOT EXISTS 'cdc'@'%'
      IDENTIFIED WITH mysql_native_password BY 'cdcpass';
    GRANT REPLICATION SLAVE, REPLICATION CLIENT, SELECT, RELOAD, LOCK TABLES,
          SHOW VIEW, PROCESS ON *.* TO 'cdc'@'%';
    FLUSH PRIVILEGES;
---
apiVersion: v1
kind: Service
metadata: { name: mysql, namespace: mysql }
spec:
  selector: { app: mysql }
  ports: [{ port: 3306, targetPort: 3306 }]
---
apiVersion: apps/v1
kind: StatefulSet
metadata: { name: mysql, namespace: mysql }
spec:
  serviceName: mysql
  replicas: 1
  selector: { matchLabels: { app: mysql } }
  template:
    metadata: { labels: { app: mysql } }
    spec:
      containers:
        - name: mysql
          image: mysql:8.0
          env:
            - { name: MYSQL_ROOT_PASSWORD, value: rootpass }
            - { name: MYSQL_DATABASE,      value: shop }
          args:
            - --server-id=1
            - --log-bin=/var/lib/mysql/mysql-bin
            - --binlog-format=ROW
            - --binlog-row-image=FULL
            - --gtid-mode=ON
            - --enforce-gtid-consistency=ON
            - --default-authentication-plugin=mysql_native_password
          ports: [{ name: mysql, containerPort: 3306 }]
          volumeMounts:
            - { name: init, mountPath: /docker-entrypoint-initdb.d }
            - { name: data, mountPath: /var/lib/mysql }
          readinessProbe:
            tcpSocket: { port: 3306 }
            initialDelaySeconds: 15
            periodSeconds: 5
      volumes:
        - { name: init, configMap: { name: mysql-init } }
        - { name: data, emptyDir: {} }
kubectl apply -f mysql.yaml

6. Apply the Pipeline CR

The user-managed Secret holds only the DSN — the operator auto-mounts the license:

# pipeline.yaml
apiVersion: v1
kind: Secret
metadata: { name: mysql-cdc-secret, namespace: redpanda }
stringData:
  MYSQL_DSN: "cdc:cdcpass@tcp(mysql.mysql.svc.cluster.local:3306)/shop"
---
apiVersion: cluster.redpanda.com/v1alpha2
kind: Pipeline
metadata: { name: mysql-cdc-orders, namespace: redpanda }
spec:
  cluster:
    clusterRef: { name: redpanda }
  secretRef:
    - name: mysql-cdc-secret
  configYaml: |
    input:
      mysql_cdc:
        dsn: "${MYSQL_DSN}"
        stream_snapshot: true
        snapshot_max_batch_size: 100
        tables:
          - orders
        flavor: mysql
        checkpoint_cache: mysql_cdc_orders_checkpoint
        checkpoint_key: "mysql_cdc_orders_checkpoint"
        checkpoint_limit: 1024
    pipeline:
      processors:
        - mapping: |
            root = this
            root.cdc_received_at = now()
    output:
      redpanda:
        seed_brokers: [ "${RPK_BROKERS}" ]
        topic: "mysql.shop.orders"
        key: '${! @table }'
    cache_resources:
      - label: mysql_cdc_orders_checkpoint
        memory: { default_ttl: 1h }
  replicas: 1
kubectl apply -f pipeline.yaml

Notes on the mysql_cdc config:

  • tables takes unqualified names (the DSN's /shop already scopes the database). shop.orders is rejected as invalid table name.
  • checkpoint_cache is required — needs to reference one of the cache_resources labels.
  • RPK_BROKERS is injected by the operator from the clusterRef resolution.

7. Verify

kubectl -n redpanda get pipeline mysql-cdc-orders
# NAME               READY   PHASE     REPLICAS   AVAILABLE   AGE
# mysql-cdc-orders   True    Running   1          1           ...

kubectl -n redpanda exec redpanda-0 -c redpanda -- \
  rpk topic consume mysql.shop.orders \
  --brokers redpanda-0.redpanda.redpanda.svc.cluster.local:9093 \
  -n 5 --offset start

# Insert live rows and watch them stream:
kubectl -n mysql exec mysql-0 -- mysql -uroot -prootpass -e \
  "USE shop; INSERT INTO orders (customer,product,qty) VALUES ('frank','gear',11);"

kubectl -n redpanda exec redpanda-0 -c redpanda -- \
  rpk topic consume mysql.shop.orders \
  --brokers redpanda-0.redpanda.redpanda.svc.cluster.local:9093 \
  -n 1 --offset 5

8. Cleanup

k3d cluster delete pipeline-e2e

@david-yu
Copy link
Copy Markdown
Contributor Author

david-yu commented May 8, 2026

Going to mark ready for review given was able to run an end to end with a few connectors successfully

@david-yu david-yu marked this pull request as ready for review May 8, 2026 09:01
The previous commit widened the Pipeline controller's RBAC marker for
`secrets` from `get;list;watch` to also include `create;update;patch;delete`
so the renderer can manage the per-Pipeline license Secret. The pipeline
ClusterRole was regenerated, but the operator chart golden file (which
captures the rendered chart output for the connect-controller-enabled,
connect-controller-with-license, and connect-monitoring-enabled cases) was
not.

Regenerated via:
  go test ./operator/chart/... -run TestTemplate -update-golden

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown

This PR is stale because it has been open 5 days with no activity. Remove stale label or comment or this will be closed in 5 days.

david-yu and others added 2 commits May 15, 2026 08:26
# Conflicts:
#	acceptance/main_test.go
Reworks the Pipeline spec contract per review feedback. The bag-of-Secret
env-splat pattern is gone; instead the spec is statically typed against the
same Kubernetes-native primitives the other CRDs already use.

Spec changes (operator/api/redpanda/v1alpha2/pipeline_types.go):

  - Drop: spec.env ([]corev1.EnvVar), spec.secretRef ([]corev1.LocalObjectReference).
  - Add:  spec.valueSources ([]NamedValueSource) — one typed env-var
          projection per entry, sourced from inline / configMapKeyRef /
          secretKeyRef / externalSecretRef (same ValueSource primitive the
          User CR uses).
  - Add:  spec.userRef (*PipelineUserRef) — binds the pipeline to a User
          CR. The operator reads the referenced User's password Secret +
          SASL mechanism and uses User.metadata.name as the SASL username.
          The User CR remains user-managed (operator does not auto-create
          it) so ACL scoping stays auditable.
  - Existing spec.cluster (*ClusterSource) semantics expanded: when set,
          the operator generates a top-level `redpanda` block in the
          rendered connect.yaml (seed_brokers, tls.root_cas_file, sasl)
          so user YAML never has to hardcode brokers/TLS/SASL. User-side
          `redpanda` keys merge on top of the generated block.
  - CEL on PipelineSpec enforces the dichotomy: userRef required with
          cluster.clusterRef, forbidden with cluster.staticConfiguration,
          forbidden without cluster.clusterRef.

Controller changes (operator/internal/controller/pipeline/):

  - cluster.go: new resolveUserRef() + userCredentials type.
  - render.go: drop EnvFrom on lint+connect containers; replace with
          buildValueSourceEnv (per-key typed env). New renderConnectYAML
          parses spec.configYaml, injects the generated `redpanda` block
          (user keys win on merge), and re-emits.
  - controller.go: resolve userRef alongside clusterRef; surface
          PipelineConditionUserRef / PipelineReasonUserResolved or
          PipelineReasonUserInvalid; pass userCredentials into the
          renderer.

Test (controller_test.go):
  - TestRender_Deployment_SecretRef → TestRender_Deployment_ValueSources,
    asserts envFrom is empty and that each ValueSource entry projects as a
    single typed EnvVar.
  - TestReconcile_InvalidClusterRefCleansUpManagedResources: add a stub
    userRef so the CEL-validated spec is accepted; cluster resolution
    still surfaces ClusterRefInvalid first.

Regenerated:
  - zz_generated.deepcopy.go, CRD YAML, crd-docs.adoc, chart template
    golden, applyconfiguration helpers.

Lint clean (helm lint + golangci-lint + actionlint).
Package tests pass: go test ./operator/internal/controller/pipeline/.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@david-yu
Copy link
Copy Markdown
Contributor Author

david-yu commented May 15, 2026

End-to-end test: EKS + RDS, new spec shape, Redpanda Connect v4.92.0

Re-ran the mysql_cdc-into-Redpanda scenario from earlier, this time against real AWS infra and using the v2 Pipeline spec from e7a2da66 (cluster.clusterRef + userRef + valueSources) instead of the old secretRef[]/env[] shape.

Stack

Piece Version / detail
EKS 1.31, single-AZ, 2× m5.large workers
Redpanda v26.1.1 via Redpanda chart, SASL enabled, 1 broker
Redpanda Connect docker.redpanda.com/redpandadata/connect:4.92.0 (pinned via Pipeline.spec.image)
RDS MySQL 8.0 db.t3.medium, custom parameter group with binlog_format=ROW, binlog_row_image=FULL
Operator built from PR HEAD e7a2da66, pushed to 605419575229.dkr.ecr.us-east-2.amazonaws.com/redpanda-operator-pr1337:e7a2da66
Secrets RDS password in AWS Secrets Manager; External Secrets Operator (ESO) syncs it to a K8s Secret via IRSA
MySQL CDC user cdc with REPLICATION SLAVE, REPLICATION CLIENT, SELECT shop.*, FLUSH_TABLES, LOCK TABLES shop.*

Result: ✅ Pass

  • Pipeline Phase=Running, Ready=True, ConfigValid=True, ClusterRef=True (ClusterRefResolved), UserRef=True (UserResolved with mechanism SCRAM-SHA-512).
  • All 5 snapshot rows reached mysql.shop.orders (offsets 0–4).
  • One live-INSERT row reached the topic at offset 10 (offsets 5–9 are a second snapshot from a pipeline restart during the operator config dance; that's a separate gotcha, see "Notes" below).
offset=0   customer=alice    product=wrench    qty=3   id=1   snapshot
offset=1   customer=carol    product=sprocket  qty=7   id=3   snapshot
offset=2   customer=dave     product=wrench    qty=2   id=4   snapshot
offset=3   customer=bob      product=hammer    qty=1   id=2   snapshot
offset=4   customer=eve      product=cog       qty=5   id=5   snapshot
offset=5–9                                                    re-snapshot after pipeline pod restart
offset=10  customer=frank    product=gear      qty=11  id=6   live binlog replay  ← validates CDC

What the new spec actually buys you

The CR for the pipeline has no inline brokers / TLS / SASL credentials — the operator generates the top-level redpanda block from cluster.clusterRef + userRef:

Pipeline CR (excerpt):

apiVersion: cluster.redpanda.com/v1alpha2
kind: Pipeline
metadata: { name: mysql-cdc-orders, namespace: redpanda }
spec:
  # No `image:` here — the chart-level default `connectController.image`
  # set during `helm install` (step 5 below) pins this Pipeline (and every
  # other Pipeline in the cluster) to redpandadata/connect:4.92.0.
  # Per-Pipeline `.spec.image` still wins if it's set; see
  # `TestRender_Deployment_ImagePrecedence` for the three-tier contract.
  cluster:
    clusterRef: { name: redpanda }
  userRef:
    name: mysql-cdc-orders-svc
  valueSources:
    - name: MYSQL_PASSWORD
      source:
        secretKeyRef: { name: mysql-cdc-creds, key: password }
    - name: MYSQL_HOST
      source:
        secretKeyRef: { name: mysql-cdc-creds, key: host }
    - name: MYSQL_USER
      source:
        secretKeyRef: { name: mysql-cdc-creds, key: username }
  configYaml: |
    input:
      mysql_cdc:
        dsn: "${MYSQL_USER}:${MYSQL_PASSWORD}@tcp(${MYSQL_HOST}:3306)/shop"
        tables: [orders]
        stream_snapshot: true
        flavor: mysql
        checkpoint_cache: mysql_cdc_orders_checkpoint
        checkpoint_key: mysql_cdc_orders_checkpoint
        checkpoint_limit: 1024
    pipeline:
      processors:
        - mapping: |
            root = this
            root.cdc_received_at = now()
    output:
      redpanda_common:                     # ← shared-client output; uses the operator-generated `redpanda:` block
        topic: "mysql.shop.orders"
        key: '${! @table }'
    cache_resources:
      - label: mysql_cdc_orders_checkpoint
        memory: { default_ttl: 1h }

Rendered connect.yaml the operator writes to the pod's ConfigMap:

cache_resources:
- label: mysql_cdc_orders_checkpoint
  memory:
    default_ttl: 1h
input:
  mysql_cdc:
    checkpoint_cache: mysql_cdc_orders_checkpoint
    checkpoint_key: mysql_cdc_orders_checkpoint
    checkpoint_limit: 1024
    dsn: ${MYSQL_USER}:${MYSQL_PASSWORD}@tcp(${MYSQL_HOST}:3306)/shop
    flavor: mysql
    snapshot_max_batch_size: 100
    stream_snapshot: true
    tables:
    - orders
output:
  redpanda_common:
    key: ${! @table }
    topic: mysql.shop.orders
pipeline:
  processors:
  - mapping: |
      root = this
      root.cdc_received_at = now()
redpanda:                                  # ← generated from cluster.clusterRef + userRef
  sasl:
  - mechanism: SCRAM-SHA-512
    password: ${REDPANDA_SASL_PASSWORD}    # injected env from User CR's password Secret
    username: ${REDPANDA_SASL_USERNAME}    # injected env (literal: "mysql-cdc-orders-svc")
  seed_brokers:
  - redpanda-0.redpanda.redpanda.svc.cluster.local.:9093

Pod env vars projected by the operator:

  • REDPANDA_SASL_USERNAME = mysql-cdc-orders-svc (literal, from User.metadata.name)
  • REDPANDA_SASL_MECHANISM = SCRAM-SHA-512
  • REDPANDA_SASL_PASSWORDsecretKeyRef: the User CR's password Secret
  • MYSQL_PASSWORD / MYSQL_HOST / MYSQL_USERsecretKeyRef: mysql-cdc-creds (the ESO-materialized Secret backed by AWS Secrets Manager)

The Pipeline CR contains zero plaintext credentials.

Reproduction steps

1. Prerequisites

  • aws + kubectl + helm + Docker on the laptop.
  • An EKS-capable AWS account; AWS_PROFILE set.
  • A Redpanda enterprise license file with the CONNECT product entitlement (the Connect runtime gates mysql_cdc on it).
  • The PR 1337 branch checked out locally so helm install can use the operator chart from the branch.

2. Terraform (AWS infra)

The pr1337-eks-rds/terraform/ directory provisions:

  • VPC (3 AZs, NAT, private subnets for EKS + RDS)
  • EKS 1.31 with OIDC provider
  • 1× m5.large managed node group
  • RDS MySQL 8.0 db.t3.medium in private subnets, with a custom mysql8.0 parameter group enabling binlog_format=ROW + binlog_row_image=FULL (NB: GTID was dropped — RDS rejects setting gtid_mode=ON directly, requires the state-walk OFF→OFF_PERMISSIVE→ON_PERMISSIVE→ON. mysql_cdc works fine on file-position binlog reading without GTID.)
  • AWS Secrets Manager secrets for the RDS master + CDC user
  • IAM role for ESO with secretsmanager:GetSecretValue on those secrets, trust on the EKS OIDC provider scoped to system:serviceaccount:external-secrets:external-secrets
  • ECR repo for the operator image
cd pr1337-eks-rds/terraform
terraform init
terraform plan -out tfplan
terraform apply tfplan

3. Build + push the operator image

cd <redpanda-operator repo at e7a2da66>
BUILD_GOOS=linux BUILD_GOARCH=amd64 go build -C ./operator \
  -o ../.build/redpanda-operator-linux-amd64 \
  -ldflags '...' ./cmd/main.go
BUILD_GOOS=linux GOARCH=amd64 go build -C ./alias \
  -o ../.build/alias-linux-amd64 \
  -ldflags '-X "main.AliasTo=/redpanda-operator run"' .

aws ecr get-login-password --region us-east-2 | \
  docker login --username AWS --password-stdin \
  605419575229.dkr.ecr.us-east-2.amazonaws.com

docker buildx build --platform linux/amd64 --provenance false --sbom false \
  --file operator/Dockerfile --target=manager \
  --tag 605419575229.dkr.ecr.us-east-2.amazonaws.com/redpanda-operator-pr1337:e7a2da66 \
  --push .build

4. cert-manager + ESO + EBS CSI

aws eks update-kubeconfig --region us-east-2 --name rp-pr1337-eks-rds
kubectl create namespace redpanda

# cert-manager
kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.17.2/cert-manager.yaml
kubectl -n cert-manager wait --for=condition=Available deploy --all --timeout=5m

# ESO with IRSA
helm repo add external-secrets https://charts.external-secrets.io && helm repo update
helm upgrade --install external-secrets external-secrets/external-secrets \
  --namespace external-secrets --create-namespace \
  --set installCRDs=true \
  --set 'serviceAccount.annotations.eks\.amazonaws\.com/role-arn=arn:aws:iam::605419575229:role/rp-pr1337-eks-rds-eso' \
  --wait

# EBS CSI driver — EKS 1.31 doesn't auto-install it
aws iam attach-role-policy --role-name <node-role> \
  --policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy
aws eks create-addon --cluster-name rp-pr1337-eks-rds --region us-east-2 \
  --addon-name aws-ebs-csi-driver --resolve-conflicts OVERWRITE
kubectl patch storageclass gp2 -p '{"metadata":{"annotations":{"storageclass.kubernetes.io/is-default-class":"true"}}}'

5. Operator (PR 1337 chart)

kubectl -n redpanda create secret generic redpanda-license \
  --from-file=license=/path/to/redpanda.license

helm upgrade --install redpanda-operator <pr-1337 repo>/operator/chart \
  --namespace redpanda \
  --set image.repository=605419575229.dkr.ecr.us-east-2.amazonaws.com/redpanda-operator-pr1337 \
  --set image.tag=e7a2da66 \
  --set image.pullPolicy=IfNotPresent \
  --set rbac.createAdditionalControllerCRs=false \
  --set connectController.enabled=true \
  # NEW chart value (commit 002fd26f): chart-level default Connect image.
  # Every Pipeline CR that doesn't pin its own .spec.image inherits this,
  # so the Connect runtime version is controlled at install time rather
  # than per-CR. Per-Pipeline .spec.image still wins; if neither is set,
  # the operator falls back to the PipelineDefaultImage constant baked
  # into the binary.
  --set connectController.image.repository=docker.redpanda.com/redpandadata/connect \
  --set connectController.image.tag=4.92.0 \
  --set enterprise.licenseSecretRef.name=redpanda-license \
  --set enterprise.licenseSecretRef.key=license \
  --set crds.enabled=true --set crds.experimental=true \
  --wait

6. Redpanda CR (SASL on, 1 broker)

kubectl -n redpanda create secret generic redpanda-bootstrap-user \
  --from-literal=password=$(openssl rand -hex 16)
kubectl -n redpanda create secret generic users --from-literal=placeholder=ignored

# 01-redpanda.yaml: clusterSpec.auth.sasl.enabled=true, bootstrapUser→redpanda-bootstrap-user,
# users[]: empty (users come from User CR), tls disabled for simplicity.
kubectl apply -f manifests/01-redpanda.yaml
kubectl -n redpanda wait redpanda/redpanda --for=condition=Ready --timeout=10m

7. ESO sync of the RDS password

# 02-eso.yaml: ClusterSecretStore + ExternalSecret pulling rp-pr1337-eks-rds/cdc-user from
# AWS Secrets Manager into a K8s Secret "mysql-cdc-creds" with keys {password, host, username}.
sed -e 's|REPLACE_REGION|us-east-2|g' \
    -e 's|REPLACE_CDC_SECRET_NAME|rp-pr1337-eks-rds/cdc-user|g' \
    manifests/02-eso.yaml | kubectl apply -f -
kubectl -n redpanda wait --for=condition=Ready externalsecret/mysql-cdc-creds --timeout=2m

8. Bootstrap RDS MySQL

kubectl apply -f manifests/05-mysql-bootstrap-job.yaml
kubectl -n redpanda wait --for=condition=Complete job/mysql-bootstrap --timeout=5m
# RDS doesn't grant RELOAD; grant the MySQL 8.0 dynamic privilege FLUSH_TABLES + LOCK TABLES instead
kubectl apply -f manifests/06-grant-flush-tables.yaml
kubectl apply -f manifests/07-grant-lock-tables.yaml

9. User CR + Pipeline CR

# 03-pipeline-user.yaml: User CR scoped to write topic mysql.shop.orders + Group prefix "mysql-cdc-orders"
# 04-pipeline.yaml: Pipeline CR with cluster.clusterRef + userRef + valueSources (see above)
kubectl apply -f manifests/03-pipeline-user.yaml
kubectl -n redpanda wait --for=condition=Synced user/mysql-cdc-orders-svc --timeout=2m
kubectl apply -f manifests/04-pipeline.yaml
kubectl -n redpanda wait --for=condition=Ready pipeline/mysql-cdc-orders --timeout=5m

10. Verify CDC

PASS=$(kubectl -n redpanda get secret redpanda-bootstrap-user -o jsonpath='{.data.password}' | base64 -d)

# 5 snapshot rows
kubectl -n redpanda exec redpanda-0 -c redpanda -- \
  rpk topic consume mysql.shop.orders -n 5 --offset start \
    --user kubernetes-controller --password "$PASS" \
    --sasl-mechanism SCRAM-SHA-256 \
    --brokers redpanda-0.redpanda.redpanda.svc.cluster.local.:9093

# Live insert
kubectl apply -f - <<EOF2
apiVersion: batch/v1
kind: Job
metadata: { name: mysql-insert-frank, namespace: redpanda }
spec:
  template:
    spec:
      restartPolicy: OnFailure
      containers:
        - name: ins
          image: mysql:8.0
          env:
            - { name: MYSQL_HOST, valueFrom: { secretKeyRef: { name: rds-master-bootstrap, key: host } } }
            - { name: MYSQL_USER, valueFrom: { secretKeyRef: { name: rds-master-bootstrap, key: username } } }
            - { name: MYSQL_PWD,  valueFrom: { secretKeyRef: { name: rds-master-bootstrap, key: password } } }
          command: ["mysql","-h","\$(MYSQL_HOST)","-u","\$(MYSQL_USER)","-e","USE shop; INSERT INTO orders (customer,product,qty) VALUES ('frank','gear',11);"]
EOF2

# Verify frank appears
kubectl -n redpanda exec redpanda-0 -c redpanda -- \
  rpk topic consume mysql.shop.orders -n 1 --offset 10 \
    --user kubernetes-controller --password "$PASS" \
    --sasl-mechanism SCRAM-SHA-256 \
    --brokers redpanda-0.redpanda.redpanda.svc.cluster.local.:9093

Notes / friction points worth knowing about

  1. output.redpanda vs output.redpanda_common. Connect 4.92.0's output.redpanda plugin does NOT use the top-level redpanda: block as its shared client — it requires inline seed_brokers. The plugin that DOES use the shared client is output.redpanda_common (and input.redpanda_common). The operator's auto-generated top-level redpanda block matches what redpanda_common expects. Pipelines authored against clusterRef + userRef need to use the _common variant on their input/output. Worth a docstring callout in Pipeline.spec.configYaml's godoc.

  2. RDS / mysql_cdc privilege quirks.

    • RELOAD is restricted on RDS (master-only). mysql_cdc's default path runs FLUSH TABLES X WITH READ LOCK, which used to require RELOAD. Grant the MySQL 8.0 dynamic privilege FLUSH_TABLES instead — RDS allows it and mysql_cdc accepts it.
    • LOCK TABLES on the database is also needed (not just FLUSH_TABLES) — GRANT LOCK TABLES ON shop.* TO cdc@'%'.
    • GTID mode in the param group is awkward on RDS (multi-step state machine). Skip it; position-based binlog reading works.
  3. EKS 1.31 doesn't bundle the EBS CSI driver. PVC creation hangs with pod has unbound immediate PersistentVolumeClaims until the addon + IAM policy are added. Step 4 above handles it.

  4. Operator stale broker IP. When the Redpanda cluster rolls (e.g. flipping SASL on after first install), the operator's cached broker connection points at the old pod IP. Without a restart, the User controller's reconcile fails with dial tcp X: connect: no route to host and User CRs sit at Synced=False. Workaround: kubectl rollout restart deploy/redpanda-operator -n redpanda after the Redpanda pod is back up. Probably worth a client-connection refresh on broker-pod-change events in the operator.

  5. Auto-generated redpanda: block requires the underlying cluster to actually have SASL on. The operator wires userRef → SASL config into the rendered redpanda block regardless of whether the Redpanda cluster has auth.sasl.enabled=true. If the cluster doesn't, Connect's SASL handshake gets ILLEGAL_SASL_STATE and the shared client never initializes. The operator could reject the CR (or fall through to a no-SASL render) when the resolved cluster's auth isn't on; today it silently produces a broken config. Probably worth a status condition like ClusterAuthCompatible.

  6. IRSA target. userRef + AWS Secrets Manager interplay: the pipeline pod itself doesn't talk to AWS. ESO does, on the pipeline's behalf, using its own IRSA-annotated service account. The pipeline pod just reads the K8s Secret that ESO materialized. That's the standard production K8s-RDS pattern — IRSA gates AWS API access, not MySQL handshake. For native RDS IAM database auth (where the pod calls rds:GenerateDBAuthToken and uses the token as the MySQL password) the Pipeline CR would need a token-refresh primitive — that's a separate follow-up not blocking this PR.


Update 2026-05-16 — Re-test on EKS 1.34 with native RDS IAM database authentication

Re-ran the e2e against EKS 1.34 using native RDS IAM database authentication via mysql_cdc.aws.enabled: true (per the Connect docs) instead of ESO-fetched MySQL passwords. This also surfaced a deprecation issue and a new design recommendation for the Pipeline CRD.

Result: ✅ Pass with native IAM auth

  • No MySQL password lives anywhere in the pipeline pod or its env. The cdc user on RDS is created with IDENTIFIED WITH AWSAuthenticationPlugin AS 'RDS'. The pipeline pod's IRSA-assumed role (...:pipeline-rds) is granted rds-db:connect on arn:aws:rds-db:<region>:<acct>:dbuser:<DbiResourceId>/cdc. mysql_cdc.aws.enabled: true makes Connect call rds:GenerateDBAuthToken and use the 15-min token as the MySQL password.
  • CDC topic mysql.shop.orders populated with snapshot rows; verified via one-shot rpk consumer:
    offset=0 key=orders val={"cdc_received_at":"…","created_at":"…","customer":"alice","id":1,"product":"wrench","qty":3}
    offset=1 …carol/sprocket
    offset=2 …eve/cog
    …
    
  • Used the non-deprecated redpanda output (not redpanda_common) — see design note below.

New / corrected friction points

N1 — redpanda_common is deprecated; CRD design is locked to it (NEW, biggest issue).
PR 1337's operator currently emits a top-level redpanda: block consumed only by the deprecated redpanda_common plugin. A Pipeline CR that uses the non-deprecated output.redpanda plugin still has to hand-write seed_brokers/sasl/tls even though all that information is already known to the operator via spec.cluster.clusterRef + spec.userRef. As Connect's deprecation cycle moves forward, the auto-generated config will start emitting warnings and eventually break.

Recommended redesign (inline-merge approach): the operator should scan the user-supplied connect.yaml, find any output.redpanda / input.redpanda blocks, and merge in seed_brokers / tls / sasl derived from the resolved cluster and user. The user writes only topic / key / etc.; the operator handles the wire-up. This keeps the Pipeline CRD k8s-native (the whole point of clusterRef + userRef) and follows the supported plugin path.

A working example of what the rendered config should look like (after the inline-merge):

output:
  redpanda:
    seed_brokers:
      - redpanda-0.redpanda.redpanda.svc.cluster.local.:9093
    sasl:
      - mechanism: SCRAM-SHA-512
        username: ${REDPANDA_SASL_USERNAME}
        password: ${REDPANDA_SASL_PASSWORD}
    topic: "mysql.shop.orders"
    key: '${! @table }'

The user authored:

output:
  redpanda:
    topic: "mysql.shop.orders"
    key: '${! @table }'

N6 (CORRECTED) — Native RDS IAM auth works today; no Pipeline CR primitive is required.
My original friction point #6 said the Pipeline CR would need a "token-refresh primitive" for native RDS IAM auth. That was wrong. mysql_cdc.aws.enabled: true makes Connect call rds:GenerateDBAuthToken itself and refreshes on its own; no operator changes needed. The only operator-side gap is no serviceAccountName field on the rendered Deployment — for now I annotated the redpanda namespace's default SA with the IRSA role ARN, which works but is awkward (every pipeline in the namespace shares that role).

Suggested follow-up: expose Pipeline.spec.podSpec.serviceAccountName (or similar) so the IRSA boundary can be per-pipeline.

N7 (NEW) — mysql_cdc.aws.endpoint requires host:port, not just host.
The IAM token signer reports "the provided endpoint is missing a port, or the provided port is invalid" if endpoint: <hostname> is set without :3306. This is a Connect-side input-validation gap, not a PR 1337 issue. Worth a docstring on mysql_cdc.aws.endpoint.

N8 (NEW) — IAM auth requires allowCleartextPasswords=1 in the DSN.
The Go MySQL driver refuses to send the IAM token (which it sees as a plaintext password) by default. Add &allowCleartextPasswords=1 to the DSN. TLS protects the token on the wire (which is fine — tls=skip-verify for the test, tls=true + RDS CA bundle for production). Worth a docs note on mysql_cdc.dsn and / or in the IAM-auth example.

N9 (NEW) — RDS bootstrap SQL: REQUIRE SSL; is not valid as a standalone statement.
MySQL requires REQUIRE SSL to be part of CREATE USER/ALTER USER, not a free-standing statement. A common pasted-from-docs gotcha. Not a PR issue.

Stack (this run)

Piece Version
EKS 1.34, single-AZ, 2× m5.large
Redpanda v25.x (chart from PR 1337 002fd26f), SASL enabled, 1 broker
Connect 4.92.0 (pinned via chart-level connectController.image.tag)
RDS MySQL 8.0 db.t3.medium, iam_database_authentication_enabled=true
Pipeline IRSA role arn:aws:iam::605419575229:role/rp-pr1337-eks-rds-pipeline-rds, trust=system:serviceaccount:redpanda:default, inline policy=rds-db:connect on the resolved DbiResourceId/cdc
MySQL CDC user cdc IDENTIFIED WITH AWSAuthenticationPlugin AS 'RDS' REQUIRE SSL

Pipeline CR (this run; uses output.redpanda directly)

apiVersion: cluster.redpanda.com/v1alpha2
kind: Pipeline
metadata: { name: mysql-cdc-orders, namespace: redpanda }
spec:
  cluster:
    clusterRef: { name: redpanda }
  userRef:
    name: mysql-cdc-orders-svc
  valueSources:
    # No MYSQL_PASSWORD — IAM auth supplies it on the fly.
    - name: MYSQL_HOST
      source: { secretKeyRef: { name: mysql-cdc-creds, key: host } }
    - name: MYSQL_USER
      source: { secretKeyRef: { name: mysql-cdc-creds, key: username } }
  configYaml: |
    input:
      mysql_cdc:
        dsn: "${MYSQL_USER}@tcp(${MYSQL_HOST}:3306)/shop?tls=skip-verify&allowCleartextPasswords=1"
        aws:
          enabled: true
          region: us-east-2
          endpoint: "${MYSQL_HOST}:3306"
        stream_snapshot: true
        tables: [orders]
        flavor: mysql
        checkpoint_cache: mysql_cdc_orders_checkpoint
        checkpoint_key: mysql_cdc_orders_checkpoint
        checkpoint_limit: 1024
    pipeline:
      processors:
        - mapping: |
            root = this
            root.cdc_received_at = now()
    output:
      # Using non-deprecated `redpanda`. Until the operator inline-merges
      # cluster connection fields (N1 above), seed_brokers + SASL still
      # need to be hand-written here.
      redpanda:
        seed_brokers:
          - redpanda-0.redpanda.redpanda.svc.cluster.local.:9093
        sasl:
          - mechanism: ${REDPANDA_SASL_MECHANISM}
            username: ${REDPANDA_SASL_USERNAME}
            password: ${REDPANDA_SASL_PASSWORD}
        topic: "mysql.shop.orders"
        key: '${! @table }'
    cache_resources:
      - label: mysql_cdc_orders_checkpoint
        memory: { default_ttl: 1h }
  replicas: 1

Two changes bundled to address two distinct review/CI signals:

1) Add `connectController.image.{repository,tag}` to the operator chart.

   New chart value plumbs through as `--connect-default-image` on the
   operator Deployment command. Pipeline.GetImage()'s precedence becomes:
     a. Pipeline.spec.image (per-Pipeline override, still highest)
     b. --connect-default-image (chart-level default, NEW)
     c. PipelineDefaultImage constant (binary-baked fallback)

   No CRD change. Existing Pipeline CRs continue to work unchanged. The
   chart value lets operators standardize the Connect runtime version
   across every pipeline without each Pipeline author remembering to set
   .spec.image. Unit-tested via TestRender_Deployment_ImagePrecedence
   (3 subtests, one per precedence tier).

2) Relax the "userRef required when cluster.clusterRef is set" CEL.

   That rule rejected the existing pipeline-crds.feature acceptance
   scenarios, which use clusterRef on an unauthenticated cluster (the
   test's `basic` cluster has SASL disabled). Drops the strict rule;
   keeps the two genuinely-correctness-defending rules:
     - userRef must NOT be set alongside staticConfiguration
     - userRef must NOT be set without clusterRef
   UserRef is now an opt-in for SASL-enabled clusters. Updated the
   docstring on UserRef to spell out the new semantics; reverted the
   stub-userRef workaround in TestReconcile_InvalidClusterRefCleansUp-
   ManagedResources.

Pipeline render layer is already correct: the auto-generated `redpanda:`
block's `sasl:` section is gated on `r.userCredentials != nil`, so a
clusterRef-only Pipeline gets a SASL-less redpanda block (brokers + TLS
only) — exactly what the acceptance tests want.

Verified locally:
  go test ./operator/internal/controller/pipeline/ -timeout 5m  → ok
  go test ./operator/chart/                                      → ok
  task lint                                                       → clean

The remaining acceptance failures in build #13661 (migration,
console-upgrades, operator-upgrades, *-crds vectorized variants) are
pre-existing cert-manager-webhook-race infra flakes — not PR 1337's
doing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@david-yu david-yu requested a review from andrewstucki May 16, 2026 03:15
…o output.redpanda

Two related design changes on top of the v2 Pipeline spec:

- Add Pipeline.spec.serviceAccountName so the IRSA / Workload Identity
  trust boundary can be scoped per-pipeline instead of relying on the
  namespace's default ServiceAccount. The operator does not create the
  SA; users provision it with cloud-IAM annotations out-of-band.

- Drop the auto-generated top-level `redpanda:` block (which only fed
  the deprecated `redpanda_common` plugin). When the Pipeline binds to
  a cluster, the operator now inline-merges seed_brokers, tls, and sasl
  into any input.redpanda / output.redpanda blocks in the user's
  configYaml. Users write topic / key / consumer_group; the operator
  fills in the connection plumbing. User-supplied keys still win on
  conflict.

Tests cover the four merge cases (output, input, user-wins, no-plugin),
guard against the redpanda_common block re-appearing, verify the
fully-inline pipeline still passes through untouched, and exercise the
new serviceAccountName field both set and unset.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…" keyword

The ConfigYAML docstring described `redpanda_common` as "deprecated" and
`redpanda` as "non-deprecated". The `rp-controller-gen deprecations`
generator scans v1alpha2 godoc for the word "deprecated" and adds any
hit to zz_generated.deprecations_test.go's TODO list. The build's
ci:lint step (which runs `task generate` and then `git diff
--exit-code`) failed because CI's regeneration added Pipeline.ConfigYAML
to that TODO list while the committed file didn't have it.

Rephrased to "the merge targets the `redpanda` input/output plugins
specifically; `redpanda_common` blocks are passed through unchanged."
Same semantics, no trigger word.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants